added some development tools
[windows-sources.git] / developer / Samples / NET Standard / ParallelExtensionsExtras_Standard / CoordinationDataStructures / AsyncCoordination / AsyncCall.cs
blob58f3196fffa92953487d1b6664c619680d33c14a
1 //--------------------------------------------------------------------------
2 //
3 // Copyright (c) Microsoft Corporation. All rights reserved.
4 //
5 // File: AsyncCall.cs
6 //
7 //--------------------------------------------------------------------------
9 using System.Collections.Concurrent;
10 using System.Collections.Generic;
11 using System.Diagnostics;
13 namespace System.Threading.Tasks
15 /// <summary>Asynchronously invokes a handler for every posted item.</summary>
16 /// <typeparam name="T">Specifies the type of data processed by the instance.</typeparam>
17 public sealed class AsyncCall<T> : MarshalByRefObject
19 /// <summary>
20 /// A queue that stores the posted data. Also serves as the syncObj for protected instance state.
21 /// A ConcurrentQueue is used to enable lock-free dequeues while running with a single consumer task.
22 /// </summary>
23 private readonly ConcurrentQueue<T> _queue;
24 /// <summary>The delegate to invoke for every element.</summary>
25 private readonly Delegate _handler;
26 /// <summary>The maximum number of items that should be processed by an individual task.</summary>
27 private readonly int _maxItemsPerTask;
28 /// <summary>The TaskFactory to use to launch new tasks.</summary>
29 private readonly TaskFactory _tf;
30 /// <summary>The options to use for parallel processing of data.</summary>
31 private readonly ParallelOptions _parallelOptions;
32 /// <summary>Whether a processing task has been scheduled.</summary>
33 private int _processingCount;
35 /// <summary>Initializes the AsyncCall with an action to execute for each element.</summary>
36 /// <param name="actionHandler">The action to run for every posted item.</param>
37 /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism to use. If not specified, 1 is used for serial execution.</param>
38 /// <param name="scheduler">The scheduler to use. If null, the default scheduler is used.</param>
39 /// <param name="maxItemsPerTask">The maximum number of items to be processed per task. If not specified, Int32.MaxValue is used.</param>
40 public AsyncCall(Action<T> actionHandler, int maxDegreeOfParallelism = 1, int maxItemsPerTask = Int32.MaxValue, TaskScheduler scheduler = null) :
41 this(maxDegreeOfParallelism, maxItemsPerTask, scheduler)
43 if (actionHandler == null) throw new ArgumentNullException("handler");
44 _handler = actionHandler;
47 /// <summary>
48 /// Initializes the AsyncCall with a function to execute for each element. The function returns an Task
49 /// that represents the asynchronous completion of that element's processing.
50 /// </summary>
51 /// <param name="functionHandler">The function to run for every posted item.</param>
52 /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism to use. If not specified, 1 is used for serial execution.</param>
53 /// <param name="scheduler">The scheduler to use. If null, the default scheduler is used.</param>
54 public AsyncCall(Func<T,Task> functionHandler, int maxDegreeOfParallelism = 1, TaskScheduler scheduler = null) :
55 this(maxDegreeOfParallelism, 1, scheduler)
57 if (functionHandler == null) throw new ArgumentNullException("handler");
58 _handler = functionHandler;
61 /// <summary>General initialization of the AsyncCall. Another constructor must initialize the delegate.</summary>
62 /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism to use. If not specified, 1 is used for serial execution.</param>
63 /// <param name="maxItemsPerTask">The maximum number of items to be processed per task. If not specified, Int32.MaxValue is used.</param>
64 /// <param name="scheduler">The scheduler to use. If null, the default scheduler is used.</param>
65 private AsyncCall(int maxDegreeOfParallelism = 1, int maxItemsPerTask = Int32.MaxValue, TaskScheduler scheduler = null)
67 // Validate arguments
68 if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
69 if (maxItemsPerTask < 1) throw new ArgumentOutOfRangeException("maxItemsPerTask");
70 if (scheduler == null) scheduler = TaskScheduler.Default;
72 // Configure the instance
73 _queue = new ConcurrentQueue<T>();
74 _maxItemsPerTask = maxItemsPerTask;
75 _tf = new TaskFactory(scheduler);
76 if (maxDegreeOfParallelism != 1)
78 _parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism, TaskScheduler = scheduler };
82 /// <summary>Post an item for processing.</summary>
83 /// <param name="item">The item to be processed.</param>
84 public void Post(T item)
86 lock (_queue)
88 // Add the item to the internal queue
89 _queue.Enqueue(item);
91 // Check to see whether the right number of tasks have been scheduled.
92 // If they haven't, schedule one for this new piece of data.
93 if (_handler is Action<T>)
95 if (_processingCount == 0)
97 _processingCount = 1;
98 _tf.StartNew(ProcessItemsActionTaskBody);
101 else if (_handler is Func<T, Task>)
103 if (_processingCount == 0 || // is anyone at all currently processing?
104 (_parallelOptions != null && _processingCount < _parallelOptions.MaxDegreeOfParallelism && // are enough workers currently processing?
105 !_queue.IsEmpty)) // and, as an optimization, double check to make sure the item hasn't already been picked up by another worker
107 _processingCount++;
108 _tf.StartNew(ProcessItemFunctionTaskBody, null);
111 else Debug.Fail("_handler is an invalid delegate type");
115 /// <summary>Gets an enumerable that yields the items to be processed at this time.</summary>
116 /// <returns>An enumerable of items.</returns>
117 private IEnumerable<T> GetItemsToProcess()
119 // Yield the next elements to be processed until either there are no more elements
120 // or we've reached the maximum number of elements that an individual task should process.
121 int processedCount = 0;
122 T nextItem;
123 while (processedCount < _maxItemsPerTask && _queue.TryDequeue(out nextItem))
125 yield return nextItem;
126 processedCount++;
130 /// <summary>Used as the body of an action task to process items in the queue.</summary>
131 private void ProcessItemsActionTaskBody()
135 // Get the handler
136 Action<T> handler = (Action<T>)_handler;
138 // Process up to _maxItemsPerTask items, either serially or in parallel
139 // based on the provided maxDegreeOfParallelism (which determines
140 // whether a ParallelOptions is instantiated).
141 if (_parallelOptions == null)
142 foreach (var item in GetItemsToProcess()) handler(item);
143 else
144 Parallel.ForEach(GetItemsToProcess(), _parallelOptions, handler);
146 finally
148 lock (_queue)
150 // If there are still items in the queue, schedule another task to continue processing.
151 // Otherwise, note that we're no longer processing.
152 if (!_queue.IsEmpty) _tf.StartNew(ProcessItemsActionTaskBody, TaskCreationOptions.PreferFairness);
153 else _processingCount = 0;
158 /// <summary>Used as the body of a function task to process items in the queue.</summary>
159 private void ProcessItemFunctionTaskBody(object ignored)
161 bool anotherTaskQueued = false;
164 // Get the handler
165 Func<T, Task> handler = (Func<T, Task>)_handler;
167 // Get the next item from the queue to process
168 T nextItem;
169 if (_queue.TryDequeue(out nextItem))
171 // Run the handler and get the follow-on task.
172 // If we got a follow-on task, run this process again when the task completes.
173 // If we didn't, just start another task to keep going now.
174 var task = handler(nextItem);
175 if (task != null) task.ContinueWith(ProcessItemFunctionTaskBody, _tf.Scheduler);
176 else _tf.StartNew(ProcessItemFunctionTaskBody, null);
178 // We've queued a task to continue processing, which means that logically
179 // we're still maintaining the same level of parallelism.
180 anotherTaskQueued = true;
183 finally
185 // If we didn't queue up another task to continue processing (either
186 // because an exception occurred, or we failed to grab an item from the queue)
187 if (!anotherTaskQueued)
189 lock (_queue)
191 // Verify that there's still nothing in the queue, now under the same
192 // lock that the queuer needs to take in order to increment the processing count
193 // and launch a new processor.
194 if (!_queue.IsEmpty) _tf.StartNew(ProcessItemFunctionTaskBody, null);
195 else _processingCount--;
202 /// <summary>Provides static factory methods for creating AsyncCall(Of T) instances.</summary>
203 public static class AsyncCall
205 /// <summary>Initializes the AsyncCall with an action to execute for each element.</summary>
206 /// <param name="actionHandler">The action to run for every posted item.</param>
207 /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism to use. If not specified, 1 is used for serial execution.</param>
208 /// <param name="scheduler">The scheduler to use. If null, the default scheduler is used.</param>
209 /// <param name="maxItemsPerTask">The maximum number of items to be processed per task. If not specified, Int32.MaxValue is used.</param>
210 public static AsyncCall<T> Create<T>(Action<T> actionHandler, int maxDegreeOfParallelism = 1, int maxItemsPerTask = Int32.MaxValue, TaskScheduler scheduler = null)
212 return new AsyncCall<T>(actionHandler, maxDegreeOfParallelism, maxItemsPerTask, scheduler);
215 /// <summary>
216 /// Initializes the AsyncCall with a function to execute for each element. The function returns an Task
217 /// that represents the asynchronous completion of that element's processing.
218 /// </summary>
219 /// <param name="functionHandler">The function to run for every posted item.</param>
220 /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism to use. If not specified, 1 is used for serial execution.</param>
221 /// <param name="maxItemsPerTask">The maximum number of items to be processed per task. If not specified, Int32.MaxValue is used.</param>
222 /// <param name="scheduler">The scheduler to use. If null, the default scheduler is used.</param>
223 public static AsyncCall<T> Create<T>(Func<T, Task> functionHandler, int maxDegreeOfParallelism = 1, TaskScheduler scheduler = null)
225 return new AsyncCall<T>(functionHandler, maxDegreeOfParallelism, scheduler);
228 /// <summary>Initializes the AsyncCall in the specified AppDomain with an action to execute for each element.</summary>
229 /// <param name="actionHandler">The action to run for every posted item.</param>
230 /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism to use. If not specified, 1 is used for serial execution.</param>
231 /// <param name="maxItemsPerTask">The maximum number of items to be processed per task. If not specified, Int32.MaxValue is used.</param>
232 public static AsyncCall<T> CreateInTargetAppDomain<T>(AppDomain targetDomain, Action<T> actionHandler, int maxDegreeOfParallelism = 1, int maxItemsPerTask = Int32.MaxValue)
234 return (AsyncCall<T>)targetDomain.CreateInstanceAndUnwrap(
235 typeof(AsyncCall<T>).Assembly.FullName, typeof(AsyncCall<T>).FullName,
236 false, Reflection.BindingFlags.CreateInstance, null,
237 new object[] { actionHandler, maxDegreeOfParallelism, maxItemsPerTask, null },
238 null, null);
241 /// <summary>
242 /// Initializes the AsyncCall in the specified AppDomain with a function to execute for each element.
243 /// The function returns an Task that represents the asynchronous completion of that element's processing.
244 /// </summary>
245 /// <param name="functionHandler">The action to run for every posted item.</param>
246 /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism to use. If not specified, 1 is used for serial execution.</param>
247 public static AsyncCall<T> CreateInTargetAppDomain<T>(AppDomain targetDomain, Func<T, Task> functionHandler, int maxDegreeOfParallelism = 1)
249 return (AsyncCall<T>)targetDomain.CreateInstanceAndUnwrap(
250 typeof(AsyncCall<T>).Assembly.FullName, typeof(AsyncCall<T>).FullName,
251 false, Reflection.BindingFlags.CreateInstance, null,
252 new object[] { functionHandler, maxDegreeOfParallelism, null },
253 null, null);